package com.cscec8b.ct.common.dao;

import com.cscec8b.ct.common.api.Column;
import com.cscec8b.ct.common.api.RowKey;
import com.cscec8b.ct.common.api.TableRef;
import com.cscec8b.ct.common.util.DataUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;


import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;


/**
 * @Copyright: Shanghai Definesys Company.All rights reserved.
 * @Description:
 * @author: chuhaitao
 * @since: 2019/1/27 20:38
 * @history: 1.2019/1/27 created by chuhaitao
 */
public class BaseDao {

    ThreadLocal<Connection> localConn = new ThreadLocal<Connection>();
    ThreadLocal<Admin> localAdmin = new ThreadLocal<Admin>();

    /**
     * 开始获取 Connection 和Admin 对象，放入到当前线程中
     *
     * @throws Exception
     */
    protected void start() throws Exception {
        getConnection();
        getAdmin();
    }

    /**
     * 关闭连接
     *
     * @throws Exception
     */
    protected void end() throws Exception {
        Admin admin = getAdmin();

        if (admin != null) {
            admin.close();
            localAdmin.remove();
        }
        Connection connection = getConnection();
        if (connection != null) {
            connection.close();
            localConn.remove();
        }
    }
    /*
       1、获取连接
       2、创建命名空间
       3、创建表
     */

    /**
     * 获取连接HBase的连接，如果当前线程中存在，则从当前线程中获取
     *
     * @return
     * @throws Exception
     */
    protected synchronized Connection getConnection() throws Exception {
        Connection connection = localConn.get();
        if (connection == null) {
            Configuration configuration = HBaseConfiguration.create();
            connection = ConnectionFactory.createConnection(configuration);
            localConn.set(connection);
        }
        return connection;
    }

    /**
     * 获取admin
     *
     * @return
     * @throws Exception
     */
    protected synchronized Admin getAdmin() throws Exception {
        Admin admin = localAdmin.get();
        if (admin == null) {
            Connection connection = getConnection();
            admin = connection.getAdmin();
            localAdmin.set(admin);
        }
        return admin;
    }

    /**
     * 1、创建表 如果表存在，先删除，后创建
     * 2、可添加预分区
     *
     * @param name
     */
    protected void createTableXX(String name, Integer regionCount, String coprocessorName, String... familys) throws Exception {
        Admin admin = getAdmin();
        TableName tableName = TableName.valueOf(name);
        if (admin.tableExists(tableName)) {
            deleteTable(name);
        }
        createTable(name, familys, regionCount, coprocessorName);
    }

    /**
     * 创建表，不添加预分区
     *
     * @param name
     */
    protected void createTableXX(String name, String coprocessorName, String... familys) throws Exception {
        createTableXX(name, null, coprocessorName, familys);
    }


    /**
     * 删除表
     *
     * @param name
     * @throws Exception
     */
    protected void deleteTable(String name) throws Exception {
        Admin admin = getAdmin();
        admin.disableTable(TableName.valueOf(name));
        admin.deleteTable(TableName.valueOf(name));
    }

    private void createTable(String name, String[] familys, Integer regionCount, String coprocessorName) throws Exception {
        Admin admin = getAdmin();
        //表描述器
        HTableDescriptor descriptor = new HTableDescriptor(name);
        /**如果列族为空 默认 info*/
        if (familys == null || familys.length == 0) {
            familys = new String[]{"info"};
        }
        //天机列描述器
        for (String family : familys) {
            HColumnDescriptor columnDescriptor = new HColumnDescriptor(family);
            descriptor.addFamily(columnDescriptor);
        }
        //添加协处理器
        if (coprocessorName != null && !"".equals(coprocessorName)) {
            descriptor.addCoprocessor(coprocessorName);
        }

        if (regionCount == null || regionCount <= 1) {
            admin.createTable(descriptor);
        } else {
            //获取 分区键
            byte[][] splitKeys = getSplitKeys(regionCount);
            admin.createTable(descriptor, splitKeys);
        }
    }

    /**
     * 获取预分区的键
     *
     * @param regionCount
     * @return
     */
    private byte[][] getSplitKeys(Integer regionCount) {
        //几行几列 ，5行
        // (-∞，0|) ,[0，1|) , [1，2|) , [2|，3|) ,[3|，4|), [4|，+∞|)

        byte[][] bs = new byte[regionCount - 1][];
        List<byte[]> bsList = new ArrayList<byte[]>();
        for (int i = 0; i < regionCount - 1; i++) {
            String key = i + "|";
            bsList.add(Bytes.toBytes(key));
        }
        bsList.toArray(bs);
        return bs;
    }


    /**
     * 1.0 插入数据
     *
     * @param name
     * @param put
     * @throws Exception
     */
    protected void putData(String name, Put put) throws Exception {

        Connection connection = getConnection();
        Table table = connection.getTable(TableName.valueOf(name));
        //放入数据
        table.put(put);

        table.close();

    }

    /**
     * 使用反射的方式动态的添加对象
     *
     * @param obj
     * @throws Exception
     */
    protected void putData(Object obj) throws Exception {

        Class clazz = obj.getClass();
        TableRef ref = (TableRef) clazz.getAnnotation(TableRef.class);
        String name = ref.value();

        Connection connection = getConnection();
        Table table = connection.getTable(TableName.valueOf(name));
        //放入数据

        Field[] fields = clazz.getDeclaredFields();
        String rowKeyValue = "";

        for (Field field : fields) {
            RowKey rowKey = field.getAnnotation(RowKey.class);
            if (rowKey != null) {
                //防止属性私有
                field.setAccessible(true);
                rowKeyValue = (String) field.get(obj);
                break;
            }
        }
        Put put = new Put(Bytes.toBytes(rowKeyValue));


        for (Field field : fields) {
            Column column = field.getAnnotation(Column.class);
            if (column != null) {
                field.setAccessible(true);
                String family = column.family();
                String col = column.column();
                if ("".equals(col)) {
                    col = field.getName();
                }
                put.addColumn(Bytes.toBytes(family), Bytes.toBytes(col), Bytes.toBytes((String) field.get(obj)));
            }
        }


        System.out.println("put rowKey---" +  Bytes.toString(put.getRow()));
        table.put(put);
        table.close();

    }


    /**
     * 创建命名空间
     *
     * @param name
     */
    protected void createNameSpaceNX(String name) throws Exception {
        Admin admin = getAdmin();

        try {
            //根据名称获取命名空间描述器，未获取到创建命名空间
            admin.getNamespaceDescriptor(name);
        } catch (NamespaceNotFoundException e) {
            NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor
                    .create(name)
                    .build();
            admin.createNamespace(namespaceDescriptor);
        }

    }


    /**
     * 计算分区号
     * 要求每个人的某年某月的数据放入同一个region中
     *
     * @param tel
     * @param date
     * @return
     */
    protected int getRegionNum(String tel, String date) {
        //从电话中获取用户的编码
        String userCode = tel.substring(tel.length() - 4);
        //截取年月
        String dateCode = date.substring(0, 6);
        int userCodeHash = userCode.hashCode();
        int dateCodeHash = dateCode.hashCode();
        //crc校验采用异或算法
        int crc = Math.abs(userCodeHash ^ dateCodeHash);
        //取模
        int r = crc % 6;
        return r;
    }


    /**
     * 1、需求：获取指定区号，指定时间范围内的所有的集合
     * 获取需要扫描的 rowkey集合
     * 分区获取，每一个区单独的获取
     * <p>
     * rowKey的格式： 5_19683537146_20180423195725_17885275338_0
     * 5_19683537146_201804   ~ 5_19683537146_201804|
     *
     * @param tel
     * @param start
     * @param end
     * @return
     */
    protected List<String[]> getRowKey(String tel, String start, String end) {
        List<String[]> rowKeys = new ArrayList<String[]>();

        //获取分区
        String startTime = start.substring(0, 6);
        String endTime = end.substring(0, 6);
        Calendar startCalendar = Calendar.getInstance();
        Calendar endCalendar = Calendar.getInstance();

        startCalendar.setTime(DataUtil.parse(startTime, "yyyyMM"));
        endCalendar.setTime(DataUtil.parse(endTime, "yyyyMM"));
        while (startCalendar.getTimeInMillis() <= endCalendar.getTimeInMillis()) {
            String nowTime = DataUtil.format(startCalendar.getTime(), "yyyyMM");
            int rowKeyPre = getRegionNum(tel, nowTime);
            String starKey = rowKeyPre + "_" + tel + "_" + nowTime;
            String[] r1 = new String[]{starKey, starKey + "|"};
            rowKeys.add(r1);
            //月份+1
            startCalendar.add(Calendar.MONTH, 1);
        }
        return rowKeys;
    }


    public static void main(String[] args) {
        //  getSplitKeys(6);

        // getRegionNum("13956643892", "20190210162800");

        // List<String []> r = getRowKey("19683537146", "20190210162800", "20190410162800");

        //  System.out.println(r.get(0)[0]+"-"+r.get(0)[1]);
    }

}
